Skip to content

Commit

Permalink
Merge pull request #173 from scipp/roi
Browse files Browse the repository at this point in the history
Add ROIFilter
  • Loading branch information
SimonHeybrock authored Feb 7, 2025
2 parents b7cda71 + 1aa5e92 commit 23df4f6
Show file tree
Hide file tree
Showing 4 changed files with 603 additions and 11 deletions.
98 changes: 87 additions & 11 deletions src/ess/reduce/live/raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
flatten dimensions of the data.
"""

from __future__ import annotations

from collections.abc import Callable, Sequence
from dataclasses import dataclass, field
from math import ceil
Expand All @@ -38,6 +40,8 @@
)
from ess.reduce.nexus.workflow import GenericNeXusWorkflow

from . import roi

CalibratedPositionWithNoisyReplicas = NewType(
'CalibratedPositionWithNoisyReplicas', sc.Variable
)
Expand Down Expand Up @@ -73,10 +77,14 @@ def __init__(
self._coords = coords
self._edges = edges

@property
def replicas(self) -> int:
return self._replicas

@staticmethod
def from_coords(
coords: ProjectedCoords, resolution: DetectorViewResolution
) -> 'Histogrammer':
) -> Histogrammer:
"""
Create a histogrammer from coordinates and resolution.
Expand All @@ -102,7 +110,20 @@ def from_coords(
def __call__(self, da: sc.DataArray) -> sc.DataArray:
self._current += 1
coords = self._coords[self._replica_dim, self._current % self._replicas]
return sc.DataArray(da.data, coords=coords).hist(self._edges)
# If input is multi-dim we need to flatten since those dims cannot be preserved.
return sc.DataArray(da.data, coords=coords).flatten(to='_').hist(self._edges)

def input_indices(self) -> sc.DataArray:
"""Return an array with input indices corresponding to each histogram bin."""
dim = 'detector_number'
# For some projections one of the coords is a scalar, convert to flat table.
coords = self._coords.broadcast(sizes=self._coords.sizes).flatten(to=dim).copy()
ndet = sc.index(coords.sizes[dim] // self._replicas)
da = sc.DataArray(
sc.arange(dim, coords.sizes[dim], dtype='int64', unit=None) % ndet,
coords=coords,
)
return sc.DataArray(da.bin(self._edges).bins.data, coords=self._edges)


@dataclass
Expand Down Expand Up @@ -149,20 +170,27 @@ def __init__(self, detector_number: sc.Variable):
sc.zeros(sizes=detector_number.sizes, unit='counts', dtype='int32'),
coords={'detector_number': detector_number},
)
self._detector_number = detector_number
self._flat_detector_number = detector_number.flatten(to='event_id')
self._start = int(self._flat_detector_number[0].value)
self._stop = int(self._flat_detector_number[-1].value)
self._size = int(self._flat_detector_number.size)
if not sc.issorted(self._flat_detector_number, dim='event_id'):
raise ValueError("Detector numbers must be sorted.")
if self._stop - self._start + 1 != self._size:
raise ValueError("Detector numbers must be consecutive.")
self._sorted = sc.issorted(self._flat_detector_number, dim='event_id')
self._consecutive = self._stop - self._start + 1 == self._size

@property
def detector_number(self) -> sc.Variable:
return self._detector_number

@property
def data(self) -> sc.DataArray:
return self._data

def bincount(self, data: Sequence[int]) -> sc.DataArray:
if not self._sorted:
raise ValueError("Detector numbers must be sorted to use `bincount`.")
if not self._consecutive:
raise ValueError("Detector numbers must be consecutive to use `bincount`.")
offset = np.asarray(data, dtype=np.int32) - self._start
# Ignore events with detector numbers outside the range of the detector. This
# should not happen in valid files but for now it is useful until we are sure
Expand Down Expand Up @@ -215,8 +243,16 @@ def __init__(
self._current = 0
self._history: sc.DataArray | None = None
self._cache: sc.DataArray | None = None
self.clear_counts()

counts = self.bincount([])
def clear_counts(self) -> None:
"""
Clear counts.
Overrides Detector.clear_counts, to properly clear sliding window history and
cache.
"""
counts = sc.zeros_like(self.data)
if self._projection is not None:
counts = self._projection(counts)
self._history = (
Expand All @@ -226,12 +262,25 @@ def __init__(
)
self._cache = self._history.sum('window')

def make_roi_filter(self) -> roi.ROIFilter:
"""Return a ROI filter operating via the projection plane of the view."""
norm = 1.0
if isinstance(self._projection, Histogrammer):
indices = self._projection.input_indices()
norm = self._projection.replicas
else:
indices = sc.ones(sizes=self.data.sizes, dtype='int32', unit=None)
indices = sc.cumsum(indices, mode='exclusive')
if isinstance(self._projection, LogicalView):
indices = self._projection(indices)
return roi.ROIFilter(indices=indices, norm=norm)

@staticmethod
def from_detector_and_histogrammer(
detector: CalibratedDetector[SampleRun],
window: RollingDetectorViewWindow,
projection: Histogrammer,
) -> 'RollingDetectorView':
) -> RollingDetectorView:
"""Helper for constructing via a Sciline workflow."""
return RollingDetectorView(
detector_number=detector.coords['detector_number'],
Expand All @@ -244,7 +293,7 @@ def from_detector_and_logical_view(
detector: CalibratedDetector[SampleRun],
window: RollingDetectorViewWindow,
projection: LogicalView,
) -> 'RollingDetectorView':
) -> RollingDetectorView:
"""Helper for constructing via a Sciline workflow."""
return RollingDetectorView(
detector_number=detector.coords['detector_number'],
Expand All @@ -261,7 +310,7 @@ def from_nexus(
projection: Literal['xy_plane', 'cylinder_mantle_z'] | LogicalView,
resolution: dict[str, int] | None = None,
pixel_noise: Literal['cylindrical'] | sc.Variable | None = None,
) -> 'RollingDetectorView':
) -> RollingDetectorView:
"""
Create a rolling detector view from a NeXus file using GenericNeXusWorkflow.
Expand Down Expand Up @@ -343,8 +392,32 @@ def get(self, window: int | None = None) -> sc.DataArray:
data += self._history['window', 0 : self._current].sum('window')
return data

def add_events(self, data: sc.DataArray) -> None:
"""
Add counts in the form of events grouped by pixel ID.
Parameters
----------
data:
Events grouped by pixel ID, given by binned data.
"""
counts = data.bins.size().to(dtype='int32', copy=False)
counts.unit = 'counts'
self._add_counts(counts)

def add_counts(self, data: Sequence[int]) -> None:
"""
Add counts in the form of a sequence of pixel IDs.
Parameters
----------
data:
List of pixel IDs.
"""
counts = self.bincount(data)
self._add_counts(counts)

def _add_counts(self, counts: sc.Variable) -> None:
if self._projection is not None:
counts = self._projection(counts)
self._cache -= self._history['window', self._current]
Expand Down Expand Up @@ -455,9 +528,12 @@ def position_noise_for_cylindrical_pixel(


def gaussian_position_noise(sigma: PositionNoiseSigma) -> PositionNoise:
sigma = sigma.to(unit='m', copy=False)
size = _noise_size
position = sc.empty(sizes={'position': size}, unit='m', dtype=sc.DType.vector3)
position.values = np.random.default_rng().normal(0, sigma.value, size=(size, 3))
position.values = np.random.default_rng(seed=1234).normal(
0, sigma.value, size=(size, 3)
)
return PositionNoise(position)


Expand Down
115 changes: 115 additions & 0 deletions src/ess/reduce/live/roi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2025 Scipp contributors (https://github.com/scipp)
"""Utilities for region of interest (ROI) selection."""

from __future__ import annotations

from typing import TypeVar

import numpy as np
import scipp as sc


def select_indices_in_intervals(
intervals: sc.DataGroup[tuple[int, int] | tuple[sc.Variable, sc.Variable]],
indices: sc.Variable | sc.DataArray,
) -> sc.Variable:
"""
Return subset of indices that fall within the intervals.
Parameters
----------
intervals:
DataGroup with dimension names as keys and tuples of low and high values. This
can be used to define a band or a rectangle to selected. When low and high are
scipp.Variable, the selection is done using label-based indexing. In this case
`indices` must be a DataArray with corresponding coordinates.
indices:
Variable or DataArray with indices to select from. If binned data the selected
indices will be returned concatenated into a dense array.
"""
out_dim = 'index'
for dim, bounds in intervals.items():
low, high = sorted(bounds)
indices = indices[dim, low:high]
indices = indices.flatten(to=out_dim)
if indices.bins is None:
return indices
indices = indices.bins.concat().value
return indices.rename_dims({indices.dim: out_dim})


T = TypeVar('T', sc.DataArray, sc.Variable)


def apply_selection(
data: T, *, selection: sc.Variable, norm: float = 1.0
) -> tuple[T, sc.Variable]:
"""
Apply selection to data.
Parameters
----------
data:
Data to filter.
selection:
Variable with indices to select.
norm:
Normalization factor to apply to the selected data. This is used for cases where
indices may be selected multiple times.
Returns
-------
:
Filtered data and scale factor.
"""
indices, counts = np.unique(selection.values, return_counts=True)
if data.ndim != 1:
data = data.flatten(to='detector_number')
scale = sc.array(dims=[data.dim], values=counts) / norm
return data[indices], scale


class ROIFilter:
"""Filter for selecting a region of interest (ROI)."""

def __init__(self, indices: sc.Variable | sc.DataArray, norm: float = 1.0) -> None:
"""
Create a new ROI filter.
Parameters
----------
indices:
Variable with indices to filter. The indices facilitate selecting a 2-D
ROI in a projection of a 3-D dataset. Typically the indices are given by a
2-D array. Each element in the array may correspond to a single index (when
there is no projection) or a list of indices that were projected into an
output pixel.
"""
self._indices = indices
self._selection = sc.array(dims=['index'], values=[])
self._norm = norm

def set_roi_from_intervals(self, intervals: sc.DataGroup) -> None:
"""Set the ROI from (typically 1 or 2) intervals."""
self._selection = select_indices_in_intervals(intervals, self._indices)

def apply(self, data: T) -> tuple[T, sc.Variable]:
"""
Apply the ROI filter to data.
The returned scale factor can be used to handle filtering via a projection, to
take into account that fractions of source data point contribute to a data point
in the projection.
Parameters
----------
data:
Data to filter.
Returns
-------
:
Filtered data and scale factor.
"""
return apply_selection(data, selection=self._selection, norm=self._norm)
Loading

0 comments on commit 23df4f6

Please sign in to comment.