Skip to content

Commit

Permalink
cleaning up interpolation code and creating missing unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
caitwolf committed Jan 22, 2024
1 parent b635bfd commit f39d0ba
Show file tree
Hide file tree
Showing 4 changed files with 312 additions and 146 deletions.
77 changes: 77 additions & 0 deletions sasdata/data_util/interpolations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
"""
Interpolation functions for 1d data sets.
"""

import numpy as np
from numpy.typing import ArrayLike
from typing import Optional, Union


def linear(x_interp: ArrayLike, x: ArrayLike, y: ArrayLike, dy: Optional[ArrayLike] = None)\
-> tuple[np.ndarray, Union[np.ndarray, None]]:
"""
Computes linear interpolation of dataset (x, y) at the points x_interp.
Error propagation is implemented when dy is provided.
Requires that min(x) <= x_interp <= max(x)
TODO: reductus package has a similar function in err1d if we add the additional dependency
"""
x_interp = np.array(x_interp)
sort = np.argsort(x)
x = np.array(x)[sort]
y = np.array(y)[sort]
dy = np.array(dy)[sort] if dy is not None else None

# find out where the interpolated points fit into the existing data
index_2 = np.searchsorted(x, x_interp)
index_1 = index_2 - 1

# linear interpolation of new y points
y_interp_1 = y[index_1] * (x_interp - x[index_2]) / (x[index_1] - x[index_2])
y_interp_2 = y[index_2] * (x_interp - x[index_1]) / (x[index_2] - x[index_1])
y_interp = y_interp_1 + y_interp_2

# error propagation
if dy is not None:
dy_interp_1 = dy[index_1] ** 2 * ((x_interp - x[index_2]) / (x[index_1] - x[index_2])) ** 2
dy_interp_2 = dy[index_2] ** 2 * ((x_interp - x[index_1]) / (x[index_2] - x[index_1])) ** 2
dy_interp = np.sqrt(dy_interp_1 + dy_interp_2)
else:
dy_interp = None

return y_interp, dy_interp


def linear_scales(x_interp: ArrayLike,
x: ArrayLike,
y: ArrayLike,
dy: Optional[ArrayLike] = None,
scale: Optional[str] = "linear") -> tuple[np.ndarray, Union[np.ndarray, None]]:
"""
Perform linear interpolation on different scales.
Error propagation is implemented when dy is provided.
Scale is set to "linear" by default.
Setting scale to "log" will perform the interpolation of (log(x), log(y)) at log(x_interp); log(y_interp) will be
converted back to y_interp in the return.
Returns (y_interp, dy_interp | None)
"""
x = np.array(x)
y = np.array(y)

if scale == "linear":
result = linear(x_interp=x_interp, x=x, y=y, dy=dy)
return result

elif scale == "log":
dy = np.array(dy) / y if dy is not None else None
x_interp = np.log(x_interp)
x = np.log(x)
y = np.log(y)
result = linear(x_interp=x_interp, x=x, y=y, dy=dy)
y_interp = np.exp(result[0])
dy_interp = None if result[1] is None else y_interp * result[1]
return y_interp, dy_interp


156 changes: 79 additions & 77 deletions sasdata/dataloader/data_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from typing import Optional

from sasdata.data_util.uncertainty import Uncertainty
from sasdata.data_util import interpolations


class plottable_1D(object):
Expand Down Expand Up @@ -54,14 +55,7 @@ class plottable_1D(object):
_yunit = ''

# operation data
_x_op = None
_y_op = None
_dx_op = None
_dy_op = None
_dxl_op = None
_dxw_op = None
_lam_op = None
_dlam_op = None
_operation = None # Data1D object that stores points used for operations

def __init__(self, x, y, dx=None, dy=None, dxl=None, dxw=None,
lam=None, dlam=None):
Expand Down Expand Up @@ -839,74 +833,82 @@ def copy_from_datainfo(self, data1d):
self.yaxis(data1d._yaxis, data1d._yunit)
self.title = data1d.title

def _interpolation_operation(self, other, tolerance: Optional[float] = 0.01):
def _interpolation_operation(self, other, tolerance: Optional[float] = 0.01, scale: Optional[str] = 'log'):
"""
Checks that x values for two datasets have overlapping ranges for an operation.
If so, _x_op, _y_op, _dx_op, _dy_op, _dxl_op, _dxw_op, _lam, _dlam for both self and other are updated to
values that will be used for the operation.
:param other: other data for operation
:param tolerance: acceptable deviation in matching x data points, default 0.01 (equivalent to 1 % deviation)
:param scale: default 'log', performs interpolation on log scale; use 'linear' for sesans data
:raise ValueError: x-ranges of self and other do not overlap
"""
# clear old interpolation arrays from previous operations on the data
# this should probably be done immediately following the operation but placing here for now
if isinstance(other, Data1D):
# check if ranges of self.x and other.x overlap at all
if np.min(other.x) > np.max(self.x) or np.max(other.x) < np.min(self.x):
msg = "Unable to perform operation: x-ranges do not overlap."
msg = "Unable to perform operation: x ranges of two datasets do not overlap."
raise ValueError(msg)
# check if data points match (within tolerance) in overlap range of self.x and other.x
# if we start to lose self.x values in the overlap region (i.e., points do not match up)
# this will fail and interpolation of the other dataset is performed
self_overlap = np.abs((self.x[:, None] - other.x[None, :]) / self.x[:, None]).min(axis=1) <= tolerance
self_overlap_pts = np.flatnonzero(self_overlap)
if len(self_overlap_pts) == len(self.x[self_overlap_pts.min():self_overlap_pts.max()+1]):
x_interp = self.x[self_overlap]
interp_mask = self_overlap # mask for self.x to select overlap region between datasets
match_pts = np.abs(x_interp[:, None] - other.x[None, :]).argmin(axis=1)
y_interp = np.copy(other.y)[match_pts]
other_dy = np.zeros(y_interp.size) if other.dy is None else np.copy(other.dy)[match_pts]
other_dx = None if other.dx is None else np.copy(other.dx)[match_pts]
other_dxl = None if other.dxl is None else np.copy(other.dxl)[match_pts]
other_dxw = None if other.dxw is None else np.copy(other.dxw)[match_pts]
# we need to interpolate the data
# check if data points match (within tolerance) across the overlap range of self.x and other.x
self_overlap_bool = np.abs((self.x[:, None] - other.x[None, :]) / self.x[:, None]).min(axis=1) <= tolerance
self_overlap_index = np.flatnonzero(self_overlap_bool)
if len(self_overlap_index) == self_overlap_index.max() - self_overlap_index.min() + 1:
# all the points in overlap region of self.x found close matches in overlap region of other.x
x_op = self.x[self_overlap_bool]
other._operation = other.clone_without_data(x_op.size)
other_overlap_index = (np.abs(x_op[:, None] - other.x[None, :])).argmin(axis=1)
y_op_other = np.copy(other.y)[other_overlap_index]
dy_op_other = np.zeros(y_op_other.size) if other.dy is None else np.copy(other.dy)[other_overlap_index]
dx_op_other = None if other.dx is None else np.copy(other.dx)[other_overlap_index]
dxl_op_other = None if other.dxl is None else np.copy(other.dxl)[other_overlap_index]
dxw_op_other = None if other.dxw is None else np.copy(other.dxw)[other_overlap_index]
lam_op_other = None if other.lam is None else np.copy(other.lam)[other_overlap_index]
dlam_op_other = None if other.dlam is None else np.copy(other.dlam)[other_overlap_index]
else:
self_overlap = np.zeros(self.x.size, dtype=bool)
self_overlap[self_overlap_pts.min():self_overlap_pts.max()+1] = True
x_interp = np.copy(self.x)[self_overlap_pts.min():self_overlap_pts.max()+1]
interp_mask = self_overlap
# linear interpolation on a log scale
y_interp = np.power(10, np.interp(np.log10(x_interp), np.log10(other.x), np.log10(other.y)))
other_dy = np.zeros(y_interp.size)
# unsure if the following is correct, but setting resolutions to None if data is interpolated
other_dx = None
other_dxl = None
other_dxw = None

other._x_op = x_interp
other._y_op = y_interp
other._dy_op = other_dy
other._dx_op = other_dx
other._dxl_op = other_dxl
other._dxw_op = other_dxw
# make sure other parameters are cleared from previous operations
other._lam_op = None
other._dlam_op = None
# not all the points found a close match so implementing interpolation on log scale
self_overlap_bool = (self.x >= max([self.x.min(), other.x.min()])) & (self.x <= min([self.x.max(), other.x.max()]))
self_overlap_index = np.flatnonzero(self_overlap_bool)
x_op = self.x[self_overlap_bool]
other._operation = other.clone_without_data(x_op.size)
if scale == 'log':
y_op_other, dy_op_other = \
interpolations.linear_scales(x_interp=x_op, x=other.x, y=other.y, dy=other.dy, scale='log')
else:
y_op_other, dy_op_other = interpolations.linear(x_interp=x_op, x=other.x, y=other.y, dy=other.dy)

# setting resolutions and wavelength parameters to None if data is interpolated
# TODO: determine proper propagation of resolution through interpolation
dx_op_other = None
dxl_op_other = None
dxw_op_other = None
lam_op_other = None
dlam_op_other = None

other._operation.x = x_op
other._operation.y = y_op_other
other._operation.dy = dy_op_other if dy_op_other is not None else np.zeros(x_op.size)
other._operation.dx = dx_op_other
other._operation.dxl = dxl_op_other
other._operation.dxw = dxw_op_other
other._operation.lam = lam_op_other
other._operation.dlam = dlam_op_other

else:
# other is something besides Data1D and so all points in self should be used for operation
# don't mess with the other parameters since it's not Data1D
interp_mask = np.ones(self.x.size, dtype=bool)

# update operation parameters of self
self._x_op = self.x[interp_mask]
self._y_op = self.y[interp_mask]
self._dy_op = self.dy[interp_mask] if self.dy is not None else np.zeros(self._y_op.size, dtype=float)
self._dx_op = self.dx[interp_mask] if self.dx is not None else None
self._dxl_op = self.dxl[interp_mask] if self.dxl is not None else None
self._dxw_op = self.dxw[interp_mask] if self.dxw is not None else None
self._lam_op = self.lam[interp_mask] if self.lam is not None else None
self._dlam_op = self.dlam[interp_mask] if self.dlam is not None else None
self_overlap_bool = np.ones(self.x.size, dtype=bool)
self_overlap_index = np.arange(0, self.x.size+1, 1)

# setup _operation Data1D for self
self._operation = self.clone_without_data(self_overlap_index.size)
self._operation.x = self.x[self_overlap_bool]
self._operation.y = self.y[self_overlap_bool]
self._operation.dy = self.dy[self_overlap_bool] if self.dy is not None \
else np.zeros(self._operation.y.size, dtype=float)
self._operation.dx = self.dx[self_overlap_bool] if self.dx is not None else None
self._operation.dxl = self.dxl[self_overlap_bool] if self.dxl is not None else None
self._operation.dxw = self.dxw[self_overlap_bool] if self.dxw is not None else None
self._operation.lam = self.lam[self_overlap_bool] if self.lam is not None else None
self._operation.dlam = self.dlam[self_overlap_bool] if self.dlam is not None else None

def _perform_operation(self, other, operation):
"""
Expand All @@ -915,27 +917,27 @@ def _perform_operation(self, other, operation):
# interpolation will be implemented on the 'other' dataset as needed
self._interpolation_operation(other)

result = self.clone_without_data(self._x_op.size)
result.x = np.copy(self._x_op)
# result.y is initialized as arrays of zero with length of _x_op
# result.dy is initialized as arrays of zero with length of _x_op
result.dx = None if self._dx_op is None else np.copy(self._dx_op)
result.dxl = None if self._dxl_op is None else np.copy(self._dxl_op)
result.dxw = None if self._dxw_op is None else np.copy(self._dxw_op)
result.lam = None if self._lam_op is None else np.copy(self._lam_op)
result.dlam = None if self._dlam_op is None else np.copy(self._dlam_op)
result = self.clone_without_data(self._operation.x.size)
result.x = np.copy(self._operation.x)
# result.y is initialized as arrays of zero with length of _operation.x
# result.dy is initialized as arrays of zero with length of _operation.x
result.dx = None if self._operation.dx is None else np.copy(self._operation.dx)
result.dxl = None if self._operation.dxl is None else np.copy(self._operation.dxl)
result.dxw = None if self._operation.dxw is None else np.copy(self._operation.dxw)
result.lam = None if self._operation.lam is None else np.copy(self._operation.lam)
result.dlam = None if self._operation.dlam is None else np.copy(self._operation.dlam)

for i in range(result.x.size):

a = Uncertainty(self._y_op[i], self._dy_op[i]**2)
a = Uncertainty(self._operation.y[i], self._operation.dy[i]**2)
if isinstance(other, Data1D):
b = Uncertainty(other._y_op[i], other._dy_op[i]**2)
if result.dx is not None and other._dx_op is not None:
result.dx[i] = math.sqrt((self._dx_op[i]**2 + other._dx_op[i]**2) / 2)
if result.dxl is not None and other._dxl_op is not None:
result.dxl[i] = math.sqrt((self._dxl_op[i]**2 + other._dxl_op[i]**2) / 2)
if result.dxw is not None and other._dxw_op is not None:
result.dxw[i] = math.sqrt((self._dxw_op[i]**2 + other._dxw_op[i]**2) / 2)
b = Uncertainty(other._operation.y[i], other._operation.dy[i]**2)
if result.dx is not None and other._operation.dx is not None:
result.dx[i] = math.sqrt((self._operation.dx[i]**2 + other._operation.dx[i]**2) / 2)
if result.dxl is not None and other._operation.dxl is not None:
result.dxl[i] = math.sqrt((self._operation.dxl[i]**2 + other._operation.dxl[i]**2) / 2)
if result.dxw is not None and other._operation.dxw is not None:
result.dxw[i] = math.sqrt((self._operation.dxw[i]**2 + other._operation.dxw[i]**2) / 2)
else:
b = other

Expand Down
Loading

0 comments on commit f39d0ba

Please sign in to comment.