Skip to content

Commit

Permalink
compute attenuation graph
Browse files Browse the repository at this point in the history
  • Loading branch information
RobBuchananCompPhys committed Dec 19, 2024
1 parent 38416fc commit 232ee15
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 48 deletions.
52 changes: 32 additions & 20 deletions MDANSE/Src/MDANSE/Mathematics/Signal.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from enum import Enum
from collections import namedtuple
from functools import partial
from typing import Tuple

import scipy.signal
from MDANSE.Mathematics.Arithmetic import weight
Expand Down Expand Up @@ -332,16 +333,15 @@ class Filter:
# Frequency response of filter
_freq_response = None

# Stores a custom frequency range as a linear series
# Stores a custom frequency range around which to compute the filter frequency response, as a linear series
_custom_freq_range = None

class FrequencyRangeMethod(Enum):
"""
"""
Auto: int = 0,
Custom: int = 0,
FFT: int = 1,
Custom: int = 2

def __init__(self, **kwargs):
if not hasattr(self, 'default_settings'):
Expand Down Expand Up @@ -396,17 +396,24 @@ def freq_response(self) -> FrequencyDomain:
return self._freq_response

@freq_response.setter
def freq_response(self, expr: TransferFunction, custom_range: np.ndarray=None) -> None:
def freq_response(self, params: Tuple[TransferFunction, FrequencyRangeMethod]) -> None:
"""Calculates the frequency response of the filter from the filter's transfer function numerator and denominator coefficients.
:Parameters:
#. expr (np.array): the rational polynomial expression for the filter transfer function, in terms of its numerator and denominator coefficients
"""
if (custom_range and isinstance(custom_range, np.ndarray)):
range = custom_range
expr, method = params
methods = self.__class__.FrequencyRangeMethod

if method is methods.FFT:
freq_range = self.frequency_range(self.n_steps, self.sample_freq**(-1))
elif (self.custom_freq_range.any() and method is methods.Custom):
#
freq_range = self.custom_freq_range
else:
range = self.frequency_range(self.n_steps, self.sample_freq**(-1))
freqs = signal.freqs(*expr, worN=np.abs(range))
RuntimeError(f"Could not find supplied frequency range around which filter frequency response will be computed. \nPlease set the 'custom_freq_range' property on the instance of {self.__class__}")

freqs = signal.freqs(*expr, worN=np.abs(freq_range))
self._freq_response = self.FrequencyDomain(*freqs)

@property
Expand Down Expand Up @@ -556,7 +563,7 @@ def energy_to_freq(cls, energy: float | np.ndarray) -> float | np.ndarray:
energy (meV) to frequency (pHz)
"""
if isinstance(energy, list):
return (np.array(energy) * 1/((2*np.pi)**(-1) * cls._freq_to_mev)) * 1e-12
return (np.array(energy) * 1/(cls._freq_to_mev)) * 1e-12

return (energy * 1/((2*np.pi)**(-1) * cls._freq_to_mev)) * 1e-12

Expand Down Expand Up @@ -592,7 +599,7 @@ def __init__(self, **kwargs):
self.coeffs = self.TransferFunction(
*signal.butter(self.order, self.cutoff_freq, btype=self.attenuation_type, analog=True, output='ba')
)
self.freq_response = self.coeffs
self.freq_response = (self.coeffs, self.__class__.FrequencyRangeMethod.FFT)


class ChebyshevTypeI(Filter):
Expand Down Expand Up @@ -630,7 +637,7 @@ def __init__(self, **kwargs):
self.coeffs = self.TransferFunction(
*signal.cheby1(self.order, self.max_ripple, self.cutoff_freq, btype=self.attenuation_type, analog=True, output='ba')
)
self.freq_response = self.coeffs
self.freq_response = (self.coeffs, self.__class__.FrequencyRangeMethod.FFT)


class ChebyshevTypeII(Filter):
Expand Down Expand Up @@ -668,7 +675,7 @@ def __init__(self, **kwargs):
self.coeffs = self.TransferFunction(
*signal.cheby2(self.order, self.min_attenuation, self.cutoff_freq, btype=self.attenuation_type, analog=True, output='ba')
)
self.freq_response = self.coeffs
self.freq_response = (self.coeffs, self.__class__.FrequencyRangeMethod.FFT)


class Elliptical(Filter):
Expand Down Expand Up @@ -710,7 +717,7 @@ def __init__(self, **kwargs):
self.coeffs = self.TransferFunction(
*signal.ellip(self.order, self.max_ripple, self.min_attenuation, self.cutoff_freq, btype=self.attenuation_type, analog=True, output='ba')
)
self.freq_response = self.coeffs
self.freq_response = (self.coeffs, self.__class__.FrequencyRangeMethod.FFT)


class Bessel(Filter):
Expand Down Expand Up @@ -749,7 +756,7 @@ def __init__(self, **kwargs):
self.coeffs = self.TransferFunction(
*signal.bessel(self.order, self.cutoff_freq, btype=self.attenuation_type, analog=True, output='ba', norm=self.norm)
)
self.freq_response = self.coeffs
self.freq_response = (self.coeffs, self.__class__.FrequencyRangeMethod.FFT)


class Notch(Filter):
Expand Down Expand Up @@ -777,7 +784,7 @@ def __init__(self, **kwargs):
self.coeffs = self.TransferFunction(
*signal.iirnotch(self.fundamental_freq, self.quality_factor)
)
self.freq_response = self.coeffs
self.freq_response = (self.coeffs, self.__class__.FrequencyRangeMethod.FFT)


class Peak(Filter):
Expand Down Expand Up @@ -805,7 +812,7 @@ def __init__(self, **kwargs):
self.coeffs = self.TransferFunction(
*signal.iirpeak(self.fundamental_freq, self.quality_factor)
)
self.freq_response = self.coeffs
self.freq_response = (self.coeffs, self.__class__.FrequencyRangeMethod.FFT)


class Comb(Filter):
Expand Down Expand Up @@ -843,7 +850,7 @@ def __init__(self, **kwargs):
self.coeffs = self.TransferFunction(
*signal.iircomb(self.fundamental_freq, self.quality_factor, ftype=self.comb_type, pass_zero=self.pass_zero)
)
self.freq_response = self.coeffs
self.freq_response = (self.coeffs, self.__class__.FrequencyRangeMethod.FFT)

FILTERS = (Butterworth, ChebyshevTypeI, ChebyshevTypeII, Elliptical, Bessel, Notch, Peak, Comb)

Expand Down Expand Up @@ -934,6 +941,8 @@ def power_spectrum(
os.chdir('..\\..\\..\\')

data = numpy.loadtxt("Tests\\UnitTests\\TrajectoryFilter\\methane_hydrogen_position.csv")
ps = ()
ps_raw = ()

N = 10000
fs = 200.0
Expand All @@ -949,8 +958,11 @@ def power_spectrum(

import matplotlib.pyplot as plt

x_raw = np.linspace(0, 646, 160)
x = np.linspace(0, 646, 1000)

plt.figure(figsize=(10, 6))
plt.plot(pre_w, pre_amplitudes, label="pre-filter")
plt.plot(x_raw, ps_raw, label="RAW POWER SPECTRUM")
plt.show()

f = filter_class(**{
Expand All @@ -973,7 +985,7 @@ def power_spectrum(
post_amplitudes = (2 * (np.abs(post_filter_freqs["h"])) / N)[:np.int32(N/2)]

plt.figure(figsize=(10, 6))
plt.plot(post_w, post_amplitudes, label="post-filter")
plt.plot(x, ps, label="UPSAMPLED POWER SPECTRUM")
plt.show()

# w0 = 2 * np.pi * 1.5
Expand All @@ -998,7 +1010,7 @@ def power_spectrum(

import matplotlib.pyplot as plt
plt.figure(figsize=(10, 6))
# plt.plot(pre_w, pre_amplitudes, label="pre-filter")
#plt.plot(pre_w, pre_amplitudes, label="pre-filter")
plt.show()

#import matplotlib.pyplot as plt
Expand Down
74 changes: 46 additions & 28 deletions MDANSE_GUI/Src/MDANSE_GUI/InputWidgets/TrajectoryFilterWidget.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
import copy
from typing import Tuple

import numpy as np
from scipy import signal
Expand Down Expand Up @@ -62,7 +63,7 @@ class FilterDesigner(QDialog):
_setting_grid_layout = QGridLayout()
_preferences_grid_layout = QGridLayout()
_preferences = dict()
_trajectory_attenuation = None
_trajectory_power_spectrum = None

def __init__(self, field: QLineEdit, configurator: TrajectoryFilterConfigurator, parent, *args, **kwargs):
"""
Expand Down Expand Up @@ -204,8 +205,8 @@ def edit_preferences(self, key: str, value: any) -> None:
self._preferences.update({key: value})

# Load trajectory attenuation
if key == "show_attenuation" and not self._trajectory_attenuation:
self._trajectory_attenuation = power_spectrum(
if key == "show_attenuation" and not self._trajectory_power_spectrum:
self._trajectory_power_spectrum = power_spectrum(
self.find_configuration_property("trajectory"),
self.find_configuration_property("frames"),
self.find_configuration_property("projection"),
Expand All @@ -217,6 +218,34 @@ def edit_preferences(self, key: str, value: any) -> None:

self.render_canvas_assets()

def set_trajectory_power_spectrum(self, filter: Filter) -> Tuple[np.ndarray, np.ndarray]:
"""
"""
response = filter.freq_response

# Lambda to resample and normalise input values
values = lambda a, new_len: signal.resample(a, new_len) * (a.max() ** (-1))

# Trajectory power spectrum data
raw_power_spectrum = copy.deepcopy(self._trajectory_power_spectrum)
raw_power_spectrum_energies, raw_power_spectrum_values = raw_power_spectrum

# Resample trajectory power spectrum energies (x-axis)
power_spectrum_energies = np.linspace(raw_power_spectrum_energies.min(), raw_power_spectrum_energies.max(), len(response.frequencies))

# Resample and normalise trajectory power spectrum (y-axis)
ps = values(raw_power_spectrum_values, len(response.frequencies))

# Compute power spectral attenuation due to filter (multiplicative)
attenuated_ps = ps * response.magnitudes

# Set custom frequency range on filter object
filter.custom_freq_range = Filter.energy_to_freq(power_spectrum_energies)
filter.freq_response = (filter.coeffs, Filter.FrequencyRangeMethod.Custom)

return (ps, attenuated_ps)

def setting_to_widget(self, setting_key: str, val_group: dict) -> QWidget:
"""Converts the setting dictionary to the corresponding setting widget and sets up connections.
Expand Down Expand Up @@ -327,7 +356,6 @@ def make_settings_grid(self, filter: Filter, grid: QGridLayout) -> None:
self._setting_grid_layout = QGridLayout()
self.update_filter(filter.__name__)


def toggle_bound_frequencies(self, on: bool=True):
"""
Expand Down Expand Up @@ -436,6 +464,7 @@ def render_graph(self,
freqs: Filter.FrequencyDomain=TrajectoryFilterConfigurator._filter.freq_response,
db_response: bool=False,
energies: bool=False,
trajectory_power_spectrum: Tuple[np.ndarray, np.ndarray]=None,
show_attenuation: bool=False) -> None:
"""Renders the graph of the designed filter frequency response.
Expand All @@ -450,30 +479,14 @@ def render_graph(self,

x = freqs.frequencies

# Check if we are displaying trajectory power spectral attenuation alongside filter response
if show_attenuation:
values = lambda v, new_len: signal.resample(v, new_len) * (v.max()**(-1))
raw_power_spectrum = copy.deepcopy(self._trajectory_attenuation)
raw_power_spectrum_energies, raw_power_spectrum_values = raw_power_spectrum

# Resample trajectory power spectrum energies (x-axis)
power_spectrum_energies = np.linspace(raw_power_spectrum_energies.min(), raw_power_spectrum_energies.max(), len(x))

# Resample trajectory power spectrum (y-axis)
ps = values(raw_power_spectrum_values, len(x))

# Compute power spectral attenuation due to filter (multiplicative)
attenuated_ps = ps * freqs.magnitudes

power_spectrum_values = 20 * np.log10(abs(ps)) if db_response else ps
attenuation_values = 20 * np.log10(abs(attenuated_ps)) if db_response else attenuated_ps

axes = self._figure.add_axes([0.1, 0.1, 0.8, 0.8])
axes.plot(
x,
20 * np.log10(abs(freqs.magnitudes)) if db_response else freqs.magnitudes,
label="Filter response"
)
axes.plot(x, 20 * np.log10(abs(freqs.magnitudes)) if db_response else freqs.magnitudes, label="Filter response")

# Conditionally display trajectory power spectral attenuation
if trajectory_power_spectrum:
ps, attenuated_ps = trajectory_power_spectrum
axes.plot(x, 20 * np.log10(abs(ps)) if db_response else ps, label="Trajectory response", color="grey")
axes.plot(x, 20 * np.log10(abs(attenuated_ps)) if db_response else attenuated_ps, label="Attenuation", color="black")

# Conditionally convert frequencies (pHz) to energies (meV)
if energies:
Expand All @@ -483,6 +496,7 @@ def render_graph(self,
axes.set_xlabel("Energy (meV)" if energies else "Frequency (pHz)")
axes.set_ylabel("Magnitude (dB)" if db_response else "Amplitude")

axes.legend(loc="best")
axes.grid(True)

self._figure.canvas.draw()
Expand Down Expand Up @@ -528,10 +542,14 @@ def render_canvas_assets(self) -> None:
filter_class = filter_map[self._settings["filter"]]
filter_preview = filter_class(**self._settings["attributes"])

# Check if we are displaying trajectory power spectral attenuation alongside filter response
if show_attenuation:
ps, attenuated_ps = self.set_trajectory_power_spectrum(filter_preview)

numerator, denominator = filter_preview.to_digital_coeffs() if not analog_filter else filter_preview.coeffs

# Render the filter graph and text
self.render_graph(filter_preview.freq_response, db_response=db_response, energies=energies, show_attenuation=show_attenuation)
self.render_graph(filter_preview.freq_response, db_response=db_response, energies=energies, trajectory_power_spectrum=(ps, attenuated_ps) if show_attenuation else None)#, show_attenuation=show_attenuation)
self.render_graph_text(
filter_class.rational_polynomial_string(numerator, denominator, analog=analog_filter),
self._settings["attributes"].get("cutoff_freq", DEFAULT_FILTER_CUTOFF),
Expand Down

0 comments on commit 232ee15

Please sign in to comment.