Skip to content

Commit

Permalink
Merge pull request #190 from h2020charisma/fit_peaks
Browse files Browse the repository at this point in the history
Add skewed profiles; Extend Ne reference data
  • Loading branch information
kerberizer authored Feb 2, 2025
2 parents b7550bb + fede7d4 commit e82dbbf
Show file tree
Hide file tree
Showing 10 changed files with 491 additions and 144 deletions.
395 changes: 285 additions & 110 deletions src/ramanchada2/misc/constants.py

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion src/ramanchada2/misc/utils/argmin2d.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def argmin2d(A, median_limit: Optional[float] = None):
x_idx = np.unique(xmin_idx[xmin_idx[ymin_idx[xmin_idx]] == xmin_idx])
y_idx = np.unique(ymin_idx[ymin_idx[xmin_idx[ymin_idx]] == ymin_idx])
dist = A[y_idx, x_idx]
filt = dist < np.median(dist) * median_limit
filt = dist <= np.median(dist) * median_limit
x_idx = x_idx[filt]
y_idx = y_idx[filt]
matches = np.stack([y_idx, x_idx]).T
Expand Down
39 changes: 39 additions & 0 deletions src/ramanchada2/misc/utils/poly_params.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import numpy as np


def linreg(x, y):
idx = np.isfinite(x) & np.isfinite(y)
x = x[idx]
y = y[idx]
s_xy = np.sum(x*y)
s_x = np.sum(x)
s_xx = np.sum(x**2)
s_y = np.sum(y)
s = len(x)
d = s*s_xx-s_x**2
a = (s*s_xy - s_x*s_y)/d
b = (s_xx*s_y - s_x*s_xy)/d
return a, b


def line_by_points(x, y):
a = (y[1]-y[0])/(x[1]-x[0])
b = y[0] - a*x[0]
return a, b


def poly2_by_points(x, y):
"""
Calculate poly-2 coefficients by 3 points.
>>> x = [1, 2, 4]
>>> y = [1, 0, 4]
>>> a, b, c = poly2_by_points(x, y)
>>> xi = np.linspace(0, 4, 30)
>>> xt = np.array([1, 2, 3, 4])
>>> yt = np.array([1, 0, 1, 4])
>>> assert np.allclose(a*xt**2 + b*xt + c, yt)
"""
a = np.array([np.square(x), x, np.ones_like(x)]).T
a_1 = np.linalg.inv(a)
return a_1@y
52 changes: 52 additions & 0 deletions src/ramanchada2/misc/utils/rough_poly2_calibration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import logging

import numpy as np

from .argmin2d import argmin2d
from .poly_params import poly2_by_points
from .significant_peaks import select_peaks

logger = logging.getLogger(__name__)


def _rough_calib_all_to_all(spe_pos, ref_pos, spe_sel_idx, ref_sel_idx):
last = 0
for r1_i, r1 in enumerate(ref_pos[ref_sel_idx]):
for r3_i, r3 in reversed(list(enumerate(ref_pos[ref_sel_idx]))[r1_i+1:]):
for r2_i, r2 in list(enumerate(ref_pos[ref_sel_idx]))[r1_i+1:r3_i-1]:
for s1_i, s1 in enumerate(spe_pos[spe_sel_idx]):
for s3_i, s3 in reversed(list(enumerate(spe_pos[spe_sel_idx]))[s1_i+1:]):
for s2_i, s2 in list(enumerate(spe_pos[spe_sel_idx]))[s1_i+1:s3_i-1]:
a, b, c = poly2_by_points(np.array([s1, s2, s3]), np.array([r1, r2, r3]))
test_spe_pos = spe_pos**2*a + spe_pos*b + c
if np.any(np.diff(test_spe_pos) <= 0):
# should be strictly increasing
continue
w = np.subtract.outer(ref_pos, test_spe_pos)
ridx, sidx = argmin2d(np.abs(w)).T
mer = - len(ridx)
mer = - np.log(len(ridx))
if last > mer:
deviation = np.inf
last = mer
if last == mer:
d = np.average(np.abs(w[ridx, sidx]))
if d < deviation:
deviation = d
logger.info(f'matches={-mer}, deviation={deviation:8.3f}, a={a}, b={b}, c={c}')
print(f'matches={len(ridx)}, deviation={deviation:8.3f}, a={a}, b={b}, c={c}')
lasta = a
lastb = b
lastc = c
return lasta, lastb, lastc


def rough_poly2_calibration(spe_dict, ref_dict, npeaks=10, **kwargs):
spe_pos = np.array(list(spe_dict.keys()))
spe_amp = np.array(list(spe_dict.values()))
ref_pos = np.array(list(ref_dict.keys()))
ref_amp = np.array(list(ref_dict.values()))
npeaks = np.min([npeaks, len(spe_pos), len(ref_pos)])
spe_sel_idx = select_peaks(spe_pos, spe_amp, npeaks=npeaks, **kwargs)
ref_sel_idx = select_peaks(ref_pos, ref_amp, npeaks=npeaks, **kwargs)
return _rough_calib_all_to_all(spe_pos, ref_pos, spe_sel_idx, ref_sel_idx)
16 changes: 16 additions & 0 deletions src/ramanchada2/misc/utils/significant_peaks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import numpy as np


def calc_weight(pos, amp, locality_factor=10):
pos = pos - pos.min()
pos /= pos.max()
m = np.subtract.outer(pos, pos)
m = np.abs(m)
w = amp/((1/np.exp(locality_factor*m)) @ amp)
return w


def select_peaks(pos, amp, npeaks, **kwargs):
w = calc_weight(pos, amp, **kwargs)
idx = np.sort(np.argsort(w)[-npeaks:])
return idx
12 changes: 9 additions & 3 deletions src/ramanchada2/protocols/calib_ne_si_argmin2d_iter_gg.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@


def neon_calibration(ne_cm_1: Spectrum,
wl: Literal[514, 532, 633, 785]):
wl: Literal[514, 532, 633, 785],
neon_rough_kw={},
neon_fine_kw={}):
"""
Neon calibration
Expand All @@ -27,8 +29,10 @@ def neon_calibration(ne_cm_1: Spectrum,
"""
ref = rc2const.neon_wl_dict[wl]
ne_nm = ne_cm_1.subtract_moving_minimum(200).shift_cm_1_to_abs_nm_filter(wl).normalize() # type: ignore
ne_nm.x = np.linspace(0, 1, len(ne_nm.x))

ne_cal = ne_nm.xcal_argmin2d_iter_lowpass(ref=ref)
ne_rcal = ne_nm.xcal_rough_poly2_all_pairs(ref=ref, **neon_rough_kw)
ne_cal = ne_rcal.xcal_fine(ref=ref, **neon_fine_kw)
spline = interpolate.Akima1DInterpolator(ne_cm_1.x, ne_cal.x, method='makima')
return spline

Expand Down Expand Up @@ -77,7 +81,7 @@ def silicon_calibration(si_nm: Spectrum,
fitres = si_nm.fit_peak_multimodel(candidates=peaks, **fp_kw) # type: ignore
si_wl = fitres.centers
if len(si_wl) < 1:
raise ValueError('No peaks were found. Please refind find_peaks parameters.')
raise ValueError('No peaks were found. Please refine find_peaks parameters.')
laser_wl = 1/(520.45/1e7 + 1/si_wl)

laser_wl = laser_wl[np.argmin(np.abs(laser_wl-wl))]
Expand Down Expand Up @@ -113,10 +117,12 @@ def neon_silicon_calibration(ne_cm_1: Spectrum,
"""
ne_spline = neon_calibration(ne_cm_1, wl)
si_nm = si_cm_1.scale_xaxis_fun(ne_spline) # type: ignore
si_nm = si_nm.dropna()
si_spline, wl = silicon_calibration(si_nm, wl,
find_peaks_kw=sil_find_kw,
fit_peaks_kw=sil_fit_kw)
ne_nm = ne_cm_1.scale_xaxis_fun(ne_spline) # type: ignore
ne_cal_cm_1 = ne_nm.abs_nm_to_shift_cm_1_filter(wl)
ne_cal_cm_1 = ne_cal_cm_1.dropna()
spline = interpolate.Akima1DInterpolator(ne_cm_1.x, ne_cal_cm_1.x, method='makima')
return spline
77 changes: 54 additions & 23 deletions src/ramanchada2/spectrum/calibration/by_deltas.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
import lmfit
import numpy as np
import numpy.typing as npt
from pydantic import BaseModel, NonNegativeInt, validate_call
from scipy import interpolate
from scipy import fft, signal
from pydantic import BaseModel, NonNegativeInt, PositiveInt, validate_call
from scipy import fft, interpolate, signal

from ramanchada2.misc.spectrum_deco import (add_spectrum_filter,
add_spectrum_method)

from ...misc import utils as rc2utils
from ...misc.utils.rough_poly2_calibration import rough_poly2_calibration
from ..spectrum import Spectrum


Expand Down Expand Up @@ -179,7 +179,8 @@ def xcal_fine(old_spe: Spectrum,
new_spe: Spectrum, /, *,
ref: Union[Dict[float, float], List[float]],
should_fit=False,
poly_order: NonNegativeInt,
poly_order: NonNegativeInt = 2,
max_iter: NonNegativeInt = 1000,
find_peaks_kw={},
):
"""
Expand All @@ -197,6 +198,8 @@ def xcal_fine(old_spe: Spectrum,
If a dict is provided - wavenumber - amplitude pairs.
If a list is provided - wavenumbers only.
poly_order (NonNegativeInt): polynomial degree to be used usualy 2 or 3
max_iter (NonNegativeInt): max number of iterations for the iterative
polynomial alignment
should_fit (bool, optional): Whether the peaks should be fit or to
associate the positions with the maxima. Defaults to False.
find_peaks_kw (dict, optional): kwargs to be used in find_peaks. Defaults to {}.
Expand All @@ -223,7 +226,7 @@ def cal_func(x, *a):
return [par*(x/1000)**power for power, par in enumerate(a)]

p0 = np.resize([0, 1000, 0], poly_order + 1)
p = rc2utils.align(spe_cent, ref_pos, p0=p0, func=cal_func)
p = rc2utils.align(spe_cent, ref_pos, p0=p0, func=cal_func, max_iter=max_iter)
spe_cal = old_spe.scale_xaxis_fun( # type: ignore
(lambda x, *args: np.sum(cal_func(x, *args), axis=0)), args=p)
new_spe.x = spe_cal.x
Expand Down Expand Up @@ -273,6 +276,25 @@ def xcal_fine_RBF(old_spe: Spectrum,
new_spe.x = interp(old_spe.x.reshape(-1, 1))


def semi_spe_from_dict(deltas: dict, xaxis):
y = np.zeros_like(xaxis)
for pos, ampl in deltas.items():
idx = np.argmin(np.abs(xaxis - pos))
y[idx] += ampl
# remove overflows and underflows
y[0] = 0
y[-1] = 0
return y


def low_pass(x, nbin, window=signal.windows.blackmanharris):
h = window(nbin*2-1)[nbin-1:]
X = fft.rfft(x)
X[:nbin] *= h # apply the window
X[nbin:] = 0 # clear upper frequencies
return fft.irfft(X, n=len(x))


@add_spectrum_filter
@validate_call(config=dict(arbitrary_types_allowed=True))
def xcal_argmin2d_iter_lowpass(old_spe: Spectrum,
Expand All @@ -297,35 +319,44 @@ def xcal_argmin2d_iter_lowpass(old_spe: Spectrum,
number of low-pass steps and their values define the amount of frequencies
to keep. Defaults to [100, 500].
"""
def semi_spe_from_dict(deltas: dict, xaxis):
y = np.zeros_like(xaxis)
for pos, ampl in deltas.items():
idx = np.argmin(np.abs(xaxis - pos))
y[idx] += ampl
# remove overflows and underflows
y[0] = 0
y[-1] = 0
return y

def low_pass(x, nbin, window=signal.windows.blackmanharris):
h = window(nbin*2-1)[nbin-1:]
X = fft.rfft(x)
X[:nbin] *= h # apply the window
X[nbin:] = 0 # clear upper frequencies
return fft.irfft(X, n=len(x))

spe = old_spe.__copy__()
for low_pass_i in low_pass_nfreqs:
xaxis = spe.x
y_ref_semi_spe = semi_spe_from_dict(ref, spe.x)
y_ref_semi_spe = low_pass(y_ref_semi_spe, low_pass_i)

r = xaxis[signal.find_peaks(y_ref_semi_spe)[0]]
r = xaxis[signal.find_peaks(y_ref_semi_spe,
prominence=np.max(y_ref_semi_spe)*.001
)[0]]

spe_low = spe.__copy__()
spe_low.y = low_pass(spe.y, low_pass_i)

spe_cal = spe_low.xcal_fine(ref=r, should_fit=False, poly_order=2)
spe_cal = spe_low.xcal_fine(ref=r, should_fit=False, poly_order=2,
# disables wlen.
# low passed peaks are much broader and
# will be affected by `wlen` so disable.
find_peaks_kw={'wlen': int(xaxis[-1]-xaxis[0])},
)
spe.x = spe_cal.x
spe_cal_fin = spe.xcal_fine(ref=ref, should_fit=False, poly_order=2)
new_spe.x = spe_cal_fin.x


@add_spectrum_filter
@validate_call(config=dict(arbitrary_types_allowed=True))
def xcal_rough_poly2_all_pairs(old_spe: Spectrum,
new_spe: Spectrum, /, *,
ref: Dict[float, float],
prominence: Optional[float] = None,
npeaks: PositiveInt = 10,
**kwargs,
):
if prominence is None:
prominence = old_spe.y_noise_MAD()*5
cand = old_spe.find_peak_multipeak(prominence=prominence) # type: ignore
spe_dict = cand.get_pos_ampl_dict()

a, b, c = rough_poly2_calibration(spe_dict, ref, npeaks=npeaks, **kwargs)
new_spe.x = a*old_spe.x**2 + b*old_spe.x + c
32 changes: 26 additions & 6 deletions src/ramanchada2/spectrum/peaks/fit_peaks.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
from ..spectrum import Spectrum

logger = logging.getLogger(__name__)
available_models = ['Gaussian', 'Lorentzian', 'Moffat', 'Voigt', 'PseudoVoigt', 'Pearson4', 'Pearson7']
available_models_type = Literal['Gaussian', 'Lorentzian', 'Moffat', 'Voigt', 'PseudoVoigt', 'Pearson4', 'Pearson7']
available_models = ['Gaussian', 'Skewed Gaussian', 'Lorentzian', 'Moffat',
'Voigt', 'Skewed Voigt', 'PseudoVoigt', 'Pearson4', 'Pearson7']
available_models_type = Literal['Gaussian', 'Skewed Gaussian', 'Lorentzian', 'Moffat',
'Voigt', 'Skewed Voigt', 'PseudoVoigt', 'Pearson4', 'Pearson7']


@validate_call(config=dict(arbitrary_types_allowed=True))
Expand All @@ -41,13 +43,30 @@ def build_multipeak_model_params(profile: Union[available_models_type, List[avai
fit_params[f'p{peak_i}_amplitude'].set(value=peak.amplitude/height_factor)
fit_params[f'p{peak_i}_beta'].set(value=1, min=1e-4, max=10)
fit_params[f'p{peak_i}_sigma'].set(value=peak.sigma)
fit_params[f'p{peak_i}_fwhm'].set(min=peak.fwhm*.4, max=peak.fwhm*2)

elif profile == 'Voigt':
fwhm_factor = 3.6013
height_factor = 1/peak.sigma/2
fit_params[f'p{peak_i}_amplitude'].set(value=peak.amplitude/height_factor)
fit_params[f'p{peak_i}_gamma'].set(value=peak.sigma/fwhm_factor, vary=True)
fit_params[f'p{peak_i}_sigma'].set(value=peak.sigma/fwhm_factor)
fit_params[f'p{peak_i}_fwhm'].set(min=peak.fwhm*.4, max=peak.fwhm*2)

elif profile == 'Skewed Voigt':
fwhm_factor = 3.6013
height_factor = 1/peak.sigma/2
fit_params[f'p{peak_i}_amplitude'].set(value=peak.amplitude/height_factor)
fit_params[f'p{peak_i}_gamma'].set(value=peak.sigma/fwhm_factor, vary=True)
fit_params[f'p{peak_i}_sigma'].set(value=peak.sigma/fwhm_factor)
fit_params[f'p{peak_i}_skew'].set(value=0)

elif profile == 'Skewed Gaussian':
fwhm_factor = lmfit_models['Gaussian'].fwhm_factor
height_factor = lmfit_models['Gaussian'].height_factor/peak.sigma
fit_params[f'p{peak_i}_amplitude'].set(value=peak.amplitude/height_factor)
fit_params[f'p{peak_i}_sigma'].set(value=peak.sigma)
fit_params[f'p{peak_i}_gamma'].set(value=peak.sigma)

elif profile == 'PseudoVoigt':
fwhm_factor = lmfit_models[profile].fwhm_factor
Expand All @@ -57,8 +76,9 @@ def build_multipeak_model_params(profile: Union[available_models_type, List[avai

elif profile == 'Pearson4':
fwhm_factor = 1
height_factor = 1/peak.sigma/3
# p{peak_i}_amplitude or p{peak_i}_height
fit_params[f'p{peak_i}_amplitude'].set(value=peak.amplitude)
fit_params[f'p{peak_i}_amplitude'].set(value=peak.amplitude/height_factor)
fit_params[f'p{peak_i}_sigma'].set(value=peak.sigma/fwhm_factor)

elif profile == 'Pearson7':
Expand All @@ -69,13 +89,13 @@ def build_multipeak_model_params(profile: Union[available_models_type, List[avai

else:
fwhm_factor = lmfit_models[profile].fwhm_factor
height_factor = lmfit_models[profile].height_factor/peak.sigma/2
height_factor = lmfit_models[profile].height_factor/peak.sigma
fit_params[f'p{peak_i}_amplitude'].set(value=peak.amplitude/height_factor)
fit_params[f'p{peak_i}_sigma'].set(value=peak.sigma)
fit_params[f'p{peak_i}_fwhm'].set(min=peak.fwhm*.4, max=peak.fwhm*2)
fit_params[f'p{peak_i}_height'].set(min=peak.amplitude*.1, max=peak.amplitude*20)

fit_params[f'p{peak_i}_amplitude'].set(min=0)
fit_params[f'p{peak_i}_fwhm'].set(min=peak.fwhm*.4, max=peak.fwhm*2)
fit_params[f'p{peak_i}_height'].set(min=peak.amplitude*.1, max=peak.amplitude*20)
fit_params[f'p{peak_i}_center'].set(value=peak.position)

return fit_model, fit_params
Expand Down
8 changes: 8 additions & 0 deletions tests/peak/test_find_peaks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import numpy as np

import ramanchada2 as rc2


def test_empty_spectrum():
spe = rc2.spectrum.Spectrum(x=np.arange(100), y=np.zeros(100))
spe.find_peak_multipeak()
2 changes: 1 addition & 1 deletion tests/protocols/test_calibrationmodel.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ def compare_calibrated_spe(setup_module, spectra, name="calibration"):
plt.savefig("test_calmodel_{}.png".format(name))
print(name, np.mean(cos_sim_matrix_original), np.mean(cos_sim_matrix))
# assert(np.mean(cos_sim_matrix_original) <= np.mean(cos_sim_matrix))
assert np.mean(cos_sim_matrix_original) <= (np.mean(cos_sim_matrix) + 1e-4)
assert np.mean(cos_sim_matrix_original) <= (np.mean(cos_sim_matrix) + 1.2e-4)


def test_xcalibration_pst(setup_module):
Expand Down

0 comments on commit e82dbbf

Please sign in to comment.